/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.lvyh.lightframe.transaction.core.coordinator;

import com.lvyh.lightframe.transaction.common.context.TransactionContext;
import com.lvyh.lightframe.transaction.common.domain.Participant;
import com.lvyh.lightframe.transaction.common.domain.RPCErrorInfo;
import com.lvyh.lightframe.transaction.common.domain.Transaction;
import com.lvyh.lightframe.transaction.common.enums.TransactionEventType;
import com.lvyh.lightframe.transaction.common.enums.TransactionRole;
import com.lvyh.lightframe.transaction.common.enums.TransactionStatus;
import com.lvyh.lightframe.transaction.common.thread.TransactionThreadLocal;
import com.lvyh.lightframe.transaction.core.event.TransactionEventProducer;
import com.lvyh.lightframe.transaction.core.service.TransactionMessageService;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

import java.lang.reflect.Method;
import java.util.Objects;
import java.util.UUID;
import java.util.stream.Stream;

/**
 * Provide transaction control processing support
 */
@Component
@Slf4j
public class TransactionCoordinatorSupport {
    private static final Logger logger = LoggerFactory.getLogger(TransactionCoordinatorSupport.class);

    @Autowired
    private TransactionEventProducer transactionEventProducer;

    @Autowired
    private TransactionMessageService transactionMessageService;

    /**
     * Open transaction
     */
    public void begin(ProceedingJoinPoint point) {
        //Create a new transaction environment context object
        TransactionContext transactionContext = new TransactionContext();
        //Set the transaction context role to "transaction initiator"
        transactionContext.setRole(TransactionRole.START.getCode());
        //Build a transaction log object
        Transaction transaction = buildTransaction(point, transactionContext, TransactionStatus.BEGIN.getCode());
        //Publish events that insert transaction log objects
        transactionEventProducer.publish(transaction, TransactionEventType.INSERT.getCode());
        //Store the current transaction log object in ThreadLocal
        TransactionThreadLocal.TRANSACTION_THREADLOCAL.set(transaction);
        //Store the transaction environment context object in ThreadLocal
        TransactionThreadLocal.CONTEXT_THREADLOCAL.set(transactionContext);
    }

    /**
     * participate transaction
     */
    public void participate(ProceedingJoinPoint point, TransactionContext transactionContext) {
        //Modify the role setting of the transaction environment context transmitted from the remote to "transaction participant"
        transactionContext.setRole(TransactionRole.ACTOR.getCode());
        //Build a transaction log object
        Transaction transaction = buildTransaction(point, transactionContext, TransactionStatus.BEGIN.getCode());
        //Publish events that insert transaction log objects
        transactionEventProducer.publish(transaction, TransactionEventType.INSERT.getCode());
        //Store the current transaction log object in ThreadLocal
        TransactionThreadLocal.TRANSACTION_THREADLOCAL.set(transaction);
        //Store the transaction environment context object in ThreadLocal
        TransactionThreadLocal.CONTEXT_THREADLOCAL.set(transactionContext);
    }

    private Transaction buildTransaction(ProceedingJoinPoint point, TransactionContext transactionContext, Integer status) {

        Class<?>[] interfaces = point.getTarget().getClass().getInterfaces();
        Class<?> clazz = Stream.of(interfaces).filter(i -> i.getSimpleName().contains("Service")).findFirst().get();
        MethodSignature signature = (MethodSignature) point.getSignature();
        Method method = signature.getMethod();
        logger.info("[TransactionCoordinatorSupport] build transaction, clazz:{}, method:{}, role:{}", clazz.getName(), method.getName(), transactionContext.getRole());

        Transaction transaction = new Transaction();
        if (StringUtils.hasLength(transactionContext.getTransactionId()))
            //If the current service is a transaction participant,
            // you do not need to create a new transaction ID. you can use the ID of the transaction initiator
            transaction.setTransactionId(transactionContext.getTransactionId());
        else {
            //If the current service is the transaction initiator,
            // you need to create a new transaction ID and set it to the transaction environment context object
            transaction.setTransactionId(String.valueOf(UUID.randomUUID().hashCode() & 0x7fffffff));
            transactionContext.setTransactionId(transaction.getTransactionId());
        }
        transaction.setRole(transactionContext.getRole());
        transaction.setStatus(status);
        transaction.setTargetClass(clazz.getName());
        transaction.setTargetMethod(method.getName());
        return transaction;
    }

    /**
     * Add a transaction participant
     */
    public void addParticipant(Participant participant) {
        //Add the transaction participant to the current transaction
        Transaction transaction = TransactionThreadLocal.TRANSACTION_THREADLOCAL.get();
        transaction.getParticipants().add(participant);
        //Publish events that modify the transaction log
        transactionEventProducer.publish(transaction, TransactionEventType.UPDATE.getCode());
    }

    /**
     * Commit transaction
     */
    public void commit() {
        Transaction transaction = TransactionThreadLocal.TRANSACTION_THREADLOCAL.get();
        //Before committing the transaction, check whether there is an error when calling the RPC interface
        //If an error is reported, the transaction status is failure, otherwise it is commit
        RPCErrorInfo rpcErrorInfo = TransactionThreadLocal.RPC_ERROR_INFO_THREADLOCAL.get();
        if (Objects.nonNull(rpcErrorInfo) && Objects.equals(rpcErrorInfo.getCode(), 1)) {
            transaction.setStatus(TransactionStatus.FAILURE.getCode());
            transaction.setErrorMessage(rpcErrorInfo.getErrorMsg());
        } else {
            transaction.setStatus(TransactionStatus.COMMIT.getCode());
        }
        //Publish events that modify the transaction log
        transactionEventProducer.publish(transaction, TransactionEventType.UPDATE.getCode());
    }

    /**
     * Transaction error
     */
    public void fail(Throwable throwable) {
        Transaction transaction = TransactionThreadLocal.TRANSACTION_THREADLOCAL.get();
        transaction.setStatus(TransactionStatus.FAILURE.getCode());
        transaction.setErrorMessage(throwable.getMessage());
        transactionEventProducer.publish(transaction, TransactionEventType.UPDATE.getCode());
    }


    /**
     * Send messages
     * <p>
     * The message body of the MQ message is: details of each transaction participant in the transaction initiator;
     * After the transaction initiator method is executed, a transaction information is obtained, which is transmitted to the transaction coordinator and saved in the transaction log;
     * <p>
     * The transaction coordinator will traverse the list of participants in the transaction. Each time it traverses a transaction participant, it will send a message to the MQ message location that the participant listens to.
     * <p>
     * The core information of the message body is: "participant's business method";
     * In this way, if the participant reports an error in the process of executing the business, he can also get the message in MQ and re execute the business method.
     */
    public void sendMessage() {
        Transaction transaction = TransactionThreadLocal.TRANSACTION_THREADLOCAL.get();
        logger.info("[TransactionCoordinatorSupport] transaction participants:{}", !CollectionUtils.isEmpty(transaction.getParticipants()) ? transaction.getParticipants().size() : "");
        if (!CollectionUtils.isEmpty(transaction.getParticipants())) {
            for (Participant participant : transaction.getParticipants()) {
                transactionMessageService.sendMessage(participant);
            }
        }
    }

}



















